# -*- coding: utf-8 -*-
# Part of dtcloud. See LICENSE file for full copyright and licensing details.

from collections import defaultdict

import psycopg2

from dtcloud.exceptions import AccessError, MissingError
from dtcloud.tests.common import TransactionCase
from dtcloud.tools import mute_logger
from dtcloud import Command


class TestORM(TransactionCase):
    """ test special behaviors of ORM CRUD functions """

    @mute_logger('dtcloud.models')
    def test_access_deleted_records(self):
        """ Verify that accessing deleted records works as expected """
        c1 = self.env['res.partner.category'].create({'name': 'W'})
        c2 = self.env['res.partner.category'].create({'name': 'Y'})
        c1.unlink()

        # read() is expected to skip deleted records because our API is not
        # transactional for a sequence of search()->read() performed from the
        # client-side... a concurrent deletion could therefore cause spurious
        # exceptions even when simply opening a list view!
        # /!\ Using unprileged user to detect former side effects of ir.rules!
        user = self.env['res.users'].create({
            'name': 'test user',
            'login': 'test2',
            'groups_id': [Command.set([self.ref('base.group_user')])],
        })
        cs = (c1 + c2).with_user(user)
        self.assertEqual([{'id': c2.id, 'name': 'Y'}], cs.read(['name']), "read() should skip deleted records")
        self.assertEqual([], cs[0].read(['name']), "read() should skip deleted records")

        # Deleting an already deleted record should be simply ignored
        self.assertTrue(c1.unlink(), "Re-deleting should be a no-op")

    @mute_logger('dtcloud.models')
    def test_access_partial_deletion(self):
        """ Check accessing a record from a recordset where another record has been deleted. """
        Model = self.env['res.country']
        self.assertTrue(type(Model).display_name.automatic, "test assumption not satisfied")

        # access regular field when another record from the same prefetch set has been deleted
        records = Model.create([{'name': name[0], 'code': name[1]} for name in (['Foo', 'ZV'], ['Bar', 'ZX'], ['Baz', 'ZY'])])
        for record in records:
            record.name
            record.unlink()

        # access computed field when another record from the same prefetch set has been deleted
        records = Model.create([{'name': name[0], 'code': name[1]} for name in (['Foo', 'ZV'], ['Bar', 'ZX'], ['Baz', 'ZY'])])
        for record in records:
            record.display_name
            record.unlink()

    @mute_logger('dtcloud.models', 'dtcloud.addons.base.models.ir_rule')
    def test_access_filtered_records(self):
        """ Verify that accessing filtered records works as expected for non-admin user """
        p1 = self.env['res.partner'].create({'name': 'W'})
        p2 = self.env['res.partner'].create({'name': 'Y'})
        user = self.env['res.users'].create({
            'name': 'test user',
            'login': 'test2',
            'groups_id': [Command.set([self.ref('base.group_user')])],
        })

        partner_model = self.env['ir.model'].search([('model','=','res.partner')])
        self.env['ir.rule'].create({
            'name': 'Y is invisible',
            'domain_force': [('id', '!=', p1.id)],
            'model_id': partner_model.id,
        })

        # search as unprivileged user
        partners = self.env['res.partner'].with_user(user).search([])
        self.assertNotIn(p1, partners, "W should not be visible...")
        self.assertIn(p2, partners, "... but Y should be visible")

        # read as unprivileged user
        with self.assertRaises(AccessError):
            p1.with_user(user).read(['name'])
        # write as unprivileged user
        with self.assertRaises(AccessError):
            p1.with_user(user).write({'name': 'foo'})
        # unlink as unprivileged user
        with self.assertRaises(AccessError):
            p1.with_user(user).unlink()

        # Prepare mixed case 
        p2.unlink()
        # read mixed records: some deleted and some filtered
        with self.assertRaises(AccessError):
            (p1 + p2).with_user(user).read(['name'])
        # delete mixed records: some deleted and some filtered
        with self.assertRaises(AccessError):
            (p1 + p2).with_user(user).unlink()

    def test_read(self):
        partner = self.env['res.partner'].create({'name': 'MyPartner1'})
        result = partner.read()
        self.assertIsInstance(result, list)

    @mute_logger('dtcloud.models')
    def test_search_read(self):
        partner = self.env['res.partner']

        # simple search_read
        partner.create({'name': 'MyPartner1'})
        found = partner.search_read([('name', '=', 'MyPartner1')], ['name'])
        self.assertEqual(len(found), 1)
        self.assertEqual(found[0]['name'], 'MyPartner1')
        self.assertIn('id', found[0])

        # search_read correct order
        partner.create({'name': 'MyPartner2'})
        found = partner.search_read([('name', 'like', 'MyPartner')], ['name'], order="name")
        self.assertEqual(len(found), 2)
        self.assertEqual(found[0]['name'], 'MyPartner1')
        self.assertEqual(found[1]['name'], 'MyPartner2')
        found = partner.search_read([('name', 'like', 'MyPartner')], ['name'], order="name desc")
        self.assertEqual(len(found), 2)
        self.assertEqual(found[0]['name'], 'MyPartner2')
        self.assertEqual(found[1]['name'], 'MyPartner1')

        # search_read that finds nothing
        found = partner.search_read([('name', '=', 'Does not exists')], ['name'])
        self.assertEqual(len(found), 0)

        # search_read with an empty array of fields
        found = partner.search_read([], [], limit=1)
        self.assertEqual(len(found), 1)
        self.assertTrue(field in list(found[0]) for field in ['id', 'name', 'display_name', 'email'])

        # search_read without fields
        found = partner.search_read([], False, limit=1)
        self.assertEqual(len(found), 1)
        self.assertTrue(field in list(found[0]) for field in ['id', 'name', 'display_name', 'email'])

    @mute_logger('dtcloud.sql_db')
    def test_exists(self):
        partner = self.env['res.partner']

        # check that records obtained from search exist
        recs = partner.search([])
        self.assertTrue(recs)
        self.assertEqual(recs.exists(), recs)

        # check that new records exist by convention
        recs = partner.new({})
        self.assertTrue(recs.exists())

        # check that there is no record with id 0
        recs = partner.browse([0])
        self.assertFalse(recs.exists())

    def test_groupby_date(self):
        partners_data = dict(
            A='2012-11-19',
            B='2012-12-17',
            C='2012-12-31',
            D='2013-01-07',
            E='2013-01-14',
            F='2013-01-28',
            G='2013-02-11',
        )

        partner_ids = []
        partner_ids_by_day = defaultdict(list)
        partner_ids_by_month = defaultdict(list)
        partner_ids_by_year = defaultdict(list)

        partners = self.env['res.partner']
        for name, date in partners_data.items():
            p = partners.create(dict(name=name, date=date))
            partner_ids.append(p.id)
            partner_ids_by_day[date].append(p.id)
            partner_ids_by_month[date.rsplit('-', 1)[0]].append(p.id)
            partner_ids_by_year[date.split('-', 1)[0]].append(p.id)

        def read_group(interval):
            domain = [('id', 'in', partner_ids)]
            result = {}
            for grp in partners.read_group(domain, ['date'], ['date:' + interval]):
                result[grp['date:' + interval]] = partners.search(grp['__domain'])
            return result

        self.assertEqual(len(read_group('day')), len(partner_ids_by_day))
        self.assertEqual(len(read_group('month')), len(partner_ids_by_month))
        self.assertEqual(len(read_group('year')), len(partner_ids_by_year))

        res = partners.read_group([('id', 'in', partner_ids)], ['date'],
                                  ['date:month', 'date:day'], lazy=False)
        self.assertEqual(len(res), len(partner_ids))

        # combine groupby and orderby
        months = ['February 2013', 'January 2013', 'December 2012', 'November 2012']
        res = partners.read_group([('id', 'in', partner_ids)], ['date'],
                                  groupby=['date:month'], orderby='date:month DESC')
        self.assertEqual([item['date:month'] for item in res], months)

        # order by date should reorder by date:month
        res = partners.read_group([('id', 'in', partner_ids)], ['date'],
                                  groupby=['date:month'], orderby='date DESC')
        self.assertEqual([item['date:month'] for item in res], months)

        # order by date should reorder by date:day
        days = ['11 Feb 2013', '28 Jan 2013', '14 Jan 2013', '07 Jan 2013',
                '31 Dec 2012', '17 Dec 2012', '19 Nov 2012']
        res = partners.read_group([('id', 'in', partner_ids)], ['date'],
                                  groupby=['date:month', 'date:day'],
                                  orderby='date DESC', lazy=False)
        self.assertEqual([item['date:day'] for item in res], days)

    def test_write_duplicate(self):
        p1 = self.env['res.partner'].create({'name': 'W'})
        (p1 + p1).write({'name': 'X'})

    def test_m2m_store_trigger(self):
        group_user = self.env.ref('base.group_user')

        user = self.env['res.users'].create({
            'name': 'test',
            'login': 'test_m2m_store_trigger',
            'groups_id': [Command.set([])],
        })
        self.assertTrue(user.share)

        group_user.write({'users': [Command.link(user.id)]})
        self.assertFalse(user.share)

        group_user.write({'users': [Command.unlink(user.id)]})
        self.assertTrue(user.share)

    @mute_logger('dtcloud.models')
    def test_unlink_with_property(self):
        """ Verify that unlink removes the related ir.property as unprivileged user """
        user = self.env['res.users'].create({
            'name': 'Justine Bridou',
            'login': 'saucisson',
            'groups_id': [Command.set([self.ref('base.group_partner_manager')])],
        })
        p1 = self.env['res.partner'].with_user(user).create({'name': 'Zorro'})
        self.env['ir.property'].with_user(user)._set_multi("ref", "res.partner", {p1.id: "Nain poilu"})
        p1_prop = self.env['ir.property'].with_user(user)._get("ref", "res.partner", res_id=p1.id)
        self.assertEqual(
            p1_prop, "Nain poilu", 'p1_prop should have been created')

        # Unlink with unprivileged user
        p1.unlink()

        # ir.property is deleted
        p1_prop = self.env['ir.property'].with_user(user)._get("ref", "res.partner", res_id=p1.id)
        self.assertEqual(
            p1_prop, False, 'p1_prop should have been deleted')

    def test_create_multi(self):
        """ create for multiple records """
        # assumption: 'res.bank' does not override 'create'
        vals_list = [{'name': name} for name in ('Foo', 'Bar', 'Baz')]
        vals_list[0]['email'] = 'foo@example.com'
        for vals in vals_list:
            record = self.env['res.bank'].create(vals)
            self.assertEqual(len(record), 1)
            self.assertEqual(record.name, vals['name'])
            self.assertEqual(record.email, vals.get('email', False))

        records = self.env['res.bank'].create([])
        self.assertFalse(records)

        records = self.env['res.bank'].create(vals_list)
        self.assertEqual(len(records), len(vals_list))
        for record, vals in zip(records, vals_list):
            self.assertEqual(record.name, vals['name'])
            self.assertEqual(record.email, vals.get('email', False))

        # create countries and states
        vals_list = [{
            'name': 'Foo',
            'state_ids': [
                Command.create({'name': 'North Foo', 'code': 'NF'}),
                Command.create({'name': 'South Foo', 'code': 'SF'}),
                Command.create({'name': 'West Foo', 'code': 'WF'}),
                Command.create({'name': 'East Foo', 'code': 'EF'}),
            ],
            'code': 'ZV',
        }, {
            'name': 'Bar',
            'state_ids': [
                Command.create({'name': 'North Bar', 'code': 'NB'}),
                Command.create({'name': 'South Bar', 'code': 'SB'}),
            ],
            'code': 'ZX',
        }]
        foo, bar = self.env['res.country'].create(vals_list)
        self.assertEqual(foo.name, 'Foo')
        self.assertCountEqual(foo.mapped('state_ids.code'), ['NF', 'SF', 'WF', 'EF'])
        self.assertEqual(bar.name, 'Bar')
        self.assertCountEqual(bar.mapped('state_ids.code'), ['NB', 'SB'])


class TestInherits(TransactionCase):
    """ test the behavior of the orm for models that use _inherits;
        specifically: res.users, that inherits from res.partner
    """

    def test_default(self):
        """ `default_get` cannot return a dictionary or a new id """
        defaults = self.env['res.users'].default_get(['partner_id'])
        if 'partner_id' in defaults:
            self.assertIsInstance(defaults['partner_id'], (bool, int))

    def test_create(self):
        """ creating a user should automatically create a new partner """
        partners_before = self.env['res.partner'].search([])
        user_foo = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'})

        self.assertNotIn(user_foo.partner_id, partners_before)

    def test_create_with_ancestor(self):
        """ creating a user with a specific 'partner_id' should not create a new partner """
        partner_foo = self.env['res.partner'].create({'name': 'Foo'})
        partners_before = self.env['res.partner'].search([])
        user_foo = self.env['res.users'].create({'partner_id': partner_foo.id, 'login': 'foo'})
        partners_after = self.env['res.partner'].search([])

        self.assertEqual(partners_before, partners_after)
        self.assertEqual(user_foo.name, 'Foo')
        self.assertEqual(user_foo.partner_id, partner_foo)

    @mute_logger('dtcloud.models')
    def test_read(self):
        """ inherited fields should be read without any indirection """
        user_foo = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'})
        user_values, = user_foo.read()
        partner_values, = user_foo.partner_id.read()

        self.assertEqual(user_values['name'], partner_values['name'])
        self.assertEqual(user_foo.name, user_foo.partner_id.name)

    @mute_logger('dtcloud.models')
    def test_copy(self):
        """ copying a user should automatically copy its partner, too """
        user_foo = self.env['res.users'].create({
            'name': 'Foo',
            'login': 'foo',
            'employee': True,
        })
        foo_before, = user_foo.read()
        del foo_before['create_date']
        del foo_before['write_date']
        user_bar = user_foo.copy({'login': 'bar'})
        foo_after, = user_foo.read()
        del foo_after['create_date']
        del foo_after['write_date']
        self.assertEqual(foo_before, foo_after)

        self.assertEqual(user_bar.name, 'Foo (copy)')
        self.assertEqual(user_bar.login, 'bar')
        self.assertEqual(user_foo.employee, user_bar.employee)
        self.assertNotEqual(user_foo.id, user_bar.id)
        self.assertNotEqual(user_foo.partner_id.id, user_bar.partner_id.id)

    @mute_logger('dtcloud.models')
    def test_copy_with_ancestor(self):
        """ copying a user with 'parent_id' in defaults should not duplicate the partner """
        user_foo = self.env['res.users'].create({'login': 'foo', 'name': 'Foo', 'signature': 'Foo'})
        partner_bar = self.env['res.partner'].create({'name': 'Bar'})

        foo_before, = user_foo.read()
        del foo_before['create_date']
        del foo_before['write_date']
        del foo_before['login_date']
        partners_before = self.env['res.partner'].search([])
        user_bar = user_foo.copy({'partner_id': partner_bar.id, 'login': 'bar'})
        foo_after, = user_foo.read()
        del foo_after['create_date']
        del foo_after['write_date']
        del foo_after['login_date']
        partners_after = self.env['res.partner'].search([])

        self.assertEqual(foo_before, foo_after)
        self.assertEqual(partners_before, partners_after)

        self.assertNotEqual(user_foo.id, user_bar.id)
        self.assertEqual(user_bar.partner_id.id, partner_bar.id)
        self.assertEqual(user_bar.login, 'bar', "login is given from copy parameters")
        self.assertFalse(user_bar.password, "password should not be copied from original record")
        self.assertEqual(user_bar.name, 'Bar', "name is given from specific partner")
        self.assertEqual(user_bar.signature, user_foo.signature, "signature should be copied")

    @mute_logger('dtcloud.models')
    def test_write_date(self):
        """ modifying inherited fields must update write_date """
        user = self.env.user
        write_date_before = user.write_date

        # write base64 image
        user.write({'image_1920': 'R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw=='})
        write_date_after = user.write_date
        self.assertNotEqual(write_date_before, write_date_after)
